package importer

import (
	"context"
	"github.com/Shopify/sarama"
	"github.com/go-kit/kit/log"
)

type ConsumerGroupHandler struct {
	logger   log.Logger
	handlers map[string]TopicHandler
}

func NewConsumerGroupHandler(logger log.Logger) *ConsumerGroupHandler {
	return &ConsumerGroupHandler{
		logger:   logger,
		handlers: make(map[string]TopicHandler),
	}
}

func (ch *ConsumerGroupHandler) AddTopicHandler(topic string, handler TopicHandler) {
	ch.handlers[topic] = handler
}

func (ch *ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
	return nil
}

func (ch *ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
	return nil
}

func (ch *ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		_ = ch.logger.Log("topic", msg.Topic, "offset", msg.Offset)
		if handler, exists := ch.handlers[msg.Topic]; exists {
			if err := handler(context.Background(), msg); err == nil {
				sess.MarkMessage(msg, "")
			}
		}
	}
	return nil
}
